package com.shujia.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}

object Demo1WordCount {
  def main(args: Array[String]): Unit = {
    /**
     * Spark core: SparkContext 核心数据结构：RDD
     * Spark sql: SparkSession 核心数据结构：DataFrame
     * Spark streaming: StreamingContext  核心数据结构：DStream(底层封装了RDD)
     */

    val conf = new SparkConf()
    conf.setMaster("local[2]") // 给定核数
    conf.setAppName("spark Streaming 单词统计")
    val sparkContext = new SparkContext(conf)

    //创建Spark Streaming的运行环境，和前两个模块是不一样的
    //Spark Streaming是依赖于Spark core的环境的
    //this(sparkContext: SparkContext, batchDuration: Duration)
    //Spark Streaming处理之前，是有一个接收数据的过程
    //batchDuration，表示接收多少时间段内的数据
    val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))

    //Spark Streaming程序理论上是一旦启动，就不会停止，除非报错，人为停止，停电等其他突然场景导致程序终止
    //监控一个端口号中的数据，手动向端口号中打数据
    val rids: ReceiverInputDStream[String] = streamingContext.socketTextStream("master", 12345)
    //hello world

    val wordsDS: DStream[String] = rids.flatMap(_.split(" "))
    val kvDS: DStream[(String, Int)] = wordsDS.map((_, 1))
    val resDS: DStream[(String, Int)] = kvDS.reduceByKey(_ + _)

//    val resDS: DStream[(String, Int)] = rids.flatMap(_.split(" "))
//      .map((_, 1))
//      .reduceByKey(_ + _)

    println("--------------------------------------")
    resDS.print()
    println("--------------------------------------")

    /**
     * sparkStreaming启动的方式和前两个模块启动方式不一样
     */
    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()

  }
}
